AddIns and Scripting

Roslyn .NET Scripting is an easy way to write custom functionality with C# scripts that are compiled during startup.

You can either write inline scripts in Foopipes.yml or load .csx scripts from a file or an URL.

Addins

Addins are scripts that are loaded, compiled and run at startup. Typically an add-in registers new service types and tasks which then are available for the pipelines.

addins:
  - url: "https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx"
  - script: |
      PipelineTask("nospaces").Json((context, json, ct) =>
         { 
           json.Data["name"] = json.Data["name"].Replace(" ", "");
           return json;
         });

services:
  mailgun:
    type: mailgun
    apiBaseUrl: https://api.mailgun.net/v3/sandbox5ded26xxxxxxxxxxxxb8.mailgun.org
    apiKey: key-3a56bxxxxxxxxxxxxxxx5c
    defaultFrom: me@mydomain.com

pipelines:
  - 
    when: 
      - queue: started
    from:
      - http: "https://jsonplaceholder.typicode.com/posts"
    do:
      - nospaces
    to:
      - log
    error:
      - { mailgun.send, to: me@mydomain.com, subject: Error, text: An error occured }

Community Addins

Addins created by the community is available as a public repository on Github: https://github.com/AreteraAB/Foopipes.Addins.

When loading a community Addin, use this URL format: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx.

Consider using a tag or commit hash instead of the latest version: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/47e1e6d74f2546673b74f929f9ebf74ca56afae5/Tail/Tail.csx.

Pull requests are welcome!

Registering tasks

Register custom tasks by passing a callback to the method PipelineTask(string name).

Pipeline tasks can be json and/or binary or dynamic, depending what kind of data they're able to handle.

PipelineTask("mytask").Json(async (context, json, cancellationToken) => 
    {
        // Do something here with json data
        return json;
    });

PipelineTask("mytask").Binary(async (context, data, cancellationToken) =>
    {
        // Do something here with binary data
        return data;
    });

Registering services

Custom services are the best way to keep state. You can either register a service type which later can be referenced in the configuration file or a service instance which will be a named singleton.

using Foopipes.Abstractions.Services;

class MyService : ServiceBase
{
    public string MyConfigValue => Config["myConfigValue"];
    private int _counter = 0;

    public int IncrementCounter()
    {
        return Interlocked.Increment(ref _counter);
    }
}

Service.Register("myserviceType", typeof(MyService));

Create an instance and configure your service in the configuration file:

services:
  myserviceInstanceName:
    type: myserviceType
    myConfigValue: hello

In your tasks, you can get hold of a service instance like this:

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        var service = await context.GetService<MyService>(defaultName: "myserviceInstanceName");
        json.Data["counter"] = service.IncrementCounter();
        return json;
    });

Observer/observable pattern

If your service implements IObservableService you can emit events that trigger pipelines. Very powerful combined with IRunnableService and System.Reactive.

#r "System.Reactive"

using System.Reactive.Subjects;
using Foopipes.Abstractions.Services;

class MyObservableService : ServiceBase, IObservableService, IRunnableService
{
    private Subject<ServiceEvent> _subject = new Subject<ServiceEvent>();

    // IObservableService 
    public IObservable<ServiceEvent> Observable => _subject;

    // IRunnableService
    public async Task Run(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(1000, cancellationToken);

            var metadata = JObject.FromObject(new
            {
                currentTime = DateTime.Now,
            });

            var serviceEvent = new ServiceEvent(this, metadata, new[] { new BinaryData(new byte[]{ 0x42} }));
            _subject.OnNext(serviceEvent);
        }
    }
}

Service.Register("myObservableService", typeof(MyObservableService));
services: 
  myObservableService: 
    type: myObservableService

pipelines: 
  - 
    when: 
      - myObservableService
    to:
      - log 

Invoking tasks from a task

You can write tasks that invoke other tasks.

PipelineTask("sendGreeting").Json(async (context, json, cancellationToken) =>
    {
        var data = JObject.FromObject(new
        {
            greeting = "hello " + await context.GetExpandedConfigValue("name")
        });

        var config = new Dictionary<string, string>
            {
                {"url", "https://www.myservice.com/api" },
                {"method", "post"},
                {"body", "formUrlEncoded"}
            };

        var r = await context.RunTask("http").WithData(data).WithArguments(config).Invoke(cancellationToken);
        return json;
    });

Invoke with:

  do:
    - { sendGreeting, name: "Foo #{lastname}" }

Returning results

Task callbacks can return json and/or binary data.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        return new JsonData( JObject.FromObject(new { hello="world"}) );
    });
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        return new BinaryData(new byte[]{0x42});
    });

It's also possible to return multiple results.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        return new ProcessJsonResult( new[]{ 
            JObject.FromObject(new { hello="world1" }),
            JObject.FromObject(new { hello="world2" }),
        });
    });

Referencing other assemblies

You can reference assemblies using the #r syntax.

#r "System.Security.Cryptography.Csp"

using System.Security.Cryptography;

var _aes = Aes.Create();

PipelineTask("decryptstring").Binary(async (context, binary, cancellationToken) =>
    {
        using (var decryptor = _aes.CreateDecryptor(key, iv))
        {
            // etc etc
        }
        return JObject.FromObject(new { value=decryptedData });
    });

Currently, it is not possible to reference Nuget assembles.

Data binding

Use context.BindValue(string bindingExpression) to obtain a value using the data binding functionality.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        var myvalue = await context.BindValue("#{elasticsearch:myvalue}");
        json.Data["boundValue"] = myvalue;
        return json;
    });

Similary, use context.SetValue(string path, object value) to set a value.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        await context.SetValue("elasticsearch:myvalue", "hello world");
        return json;
    });

Class Reference

Addin host globals:

{
    ITaskBuilder PipelineTask(string name);
    IServiceBuilder Service { get; }
}
interface ITaskBuilder
{
    /************ Async Json ************/
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<IProcessResult>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JsonData>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject[]>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<BinaryData>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<byte[]>> func);

    /************ Non async Json ************/
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, IProcessResult> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JsonData> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject[]> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, byte[]> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, BinaryData> func);

    /************ async Binary************/
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<IProcessResult>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JsonData>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject[]>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<BinaryData>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<byte[]>> func);

    /************ Non async Binary************/
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, IProcessResult> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JsonData> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject[]> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, byte[]> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, BinaryData> func);

    /************ Non async Dynamic ************/
    public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, dynamic> func);

    /************ async Dynamic ************/
    public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, Task<dynamic>> func);

    public ITaskBuilder WithDefaultConfigKey(string defaultConfigKey);
}

interface IServiceBuilder
{
    public IServiceBuilder Instance(string name, IService instance);
    public IServiceBuilder Register(string typeName, Type serviceType);
}
interface IScriptTaskContext
{
    IDictionary<string, string> Config { get; }
    ILogger Logger { get; }
    ILoggerFactory LoggerFactory { get; }
    IPipelineContext PipelineContext { get; }
    IServiceProvider ServiceProvider { get; }

    Task<string> BindValue(string bindingExpression);
    Task<string> GetExpandedConfigValue(string key, bool throwIfNotSet = true);
    IService GetService(string name);
    IRunTaskBuilder RunTask(string name);
    Task SetValue(string path, object val);
}
public static class ScriptTaskContextExtensions
{
    public static async Task<TService> GetService<TService>(this IScriptTaskContext context, 
        string defaultName, 
        string configKeyName = "service",
        bool throwIfNotFound = true) where TService : class;
    public static T GetAndConvertConfigValue<T>(this IScriptTaskContext context, string key, T defaultValue=default(T));
    public static string GetConfigValue(this IScriptTaskContext context, string key, bool throwIfNotSet=true);
}
public class JsonData : IProcessResultData
{
    public JsonData(JObject jsonData, JObject metadata=null);

    public JObject Metadata { get; }
    public JObject Data { get; }

    public static JsonData Empty { get; }
}

public class BinaryData : IProcessResultData
{
    public BinaryData(byte[] binaryData, JObject metadata = null);

    public JObject Metadata { get; }
    public byte[] Data { get; }
    public static BinaryData Empty { get; }
}